#include <boost/fiber/all.hpp>

#include "zmq_messenger_impl.h"
#include "app_common.h"
#include "loghelper.h"
#include "messagebus.h"

using namespace RockLog;

ZMQMessengerImpl::ZMQMessengerImpl()
    :m_context(1), m_publisher(m_context, ZMQ_PUB)
{
    m_publisher.bind(kZMQServerURL);       
    LOG(kInfo) << "bind url:" << kZMQServerURL;
}

void ZMQMessengerImpl::subcribeMsg(const std::string &workerId)
{
    std::string url = "ipc://" + workerId + ".ipc";
    zmq::context_t context(1);
    zmq::socket_t subscriber(context, ZMQ_SUB);
    subscriber.connect(url);
    subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    LOG(kInfo) << "subcribe url: " << url;
    while (true)
    {
        zmq::message_t msg;
        subscriber.recv(&msg);
        g_messagebus.sendMessage(kZmqRecvMSG, workerId.c_str(), (const char*)msg.data(), (uint32_t)msg.size());
    }
}

void ZMQMessengerImpl::publishMsg(const std::string &workerId, const std::string& msg)
{
    if (workerId.length() != kWorkIdLength)
    {
        LOG(kErr) << "invalid workerId length!";
        return;
    }
    zmq::message_t zmq_msg(msg.size() + kWorkIdLength);
    memcpy((char *)zmq_msg.data(), workerId.c_str(), kWorkIdLength);
    memcpy((char *)zmq_msg.data() + kWorkIdLength, msg.c_str(), msg.size());
    m_publisher.send(zmq_msg);
}

void ZMQMessengerImpl::publishMsg(const std::string &workerId, const char* msg, uint32_t len)
{
    if (workerId.length() != kWorkIdLength)
    {
        LOG(kErr) << "invalid workerId length!";
        return;
    }
    zmq::message_t zmq_msg(len + kWorkIdLength);
    memcpy((char *)zmq_msg.data(), workerId.c_str(), kWorkIdLength);
    memcpy((char *)zmq_msg.data() + kWorkIdLength, msg, len);
    m_publisher.send(zmq_msg);
}

